This notebook can be better viewed on (much better than viewing on GitHub) nbviewer
For further details visit our GitHub Repository
# importing libraries
import random
import numpy as np
import pandas as pd
import seaborn as sns
from pathlib import Path
import lucrum.algo as algo
import matplotlib.pyplot as plt
import lucrum.datareader as ldr
import lucrum.algo.finstats as fs
from collections import OrderedDict
# trading strategies
from lucrum.algo import (SimpleRsiAlgo, CombinedTaAlgo,
MACrossoverAlgo, BuyHoldStrategy,
SimpleWilliamRAlgo, XgbBoostAlgo,
RandomForestAlgo, LogRegAlgo)
# hide warnings
import warnings
warnings.filterwarnings('ignore')
# set parameters for importing datasets
data_path = "data/" # data is stored in data directory
data_format = ".csv" # file is saved as .csv format
data_source = "binance" # Binance API will be used to download data
data_interval = "15m" # we will be fetching 15m interval data
data_timezone = "Europe/Malta" # timezone used to convert the data timezone from source
# currency pairs which will be investigated in this study
# we will be investigating 6 different cryptocurrencies pairs
# all of which are against USDT which is considered as a stable coin
# Bitcoin/Tether 17-08-2017 to 01-06-2019
# Ethereum/Tether 17-08-2017 to 01-06-2019
# Zerps/Tether 04-05-2018 to 01-06-2019
# Litcoin/Tether 13-12-2017 to 01-06-2019
# EOS/Tether 28-05-2018 to 01-06-2019
# Stellar/Tether 31-05-2018 to 01-06-2019
currency_pairs = [("BTCUSDT", "17 Aug, 2017", "1 Jun, 2019"),
("ETHUSDT", "17 Aug, 2017", "1 Jun, 2019"),
("XRPUSDT", "4 May, 2018", "1 Jun, 2019"),
("LTCUSDT", "13 Dec, 2017", "1 Jun, 2019"),
("EOSUSDT", "28 May, 2018", "1 Jun, 2019"),
("XLMUSDT", "31 May, 2018", "1 Jun, 2019")]
# create an ordered dictionary to hold all the data
pairs_dict = OrderedDict()
# check if data exists as .csv, if not found in data path, download the 15m data from Binance API
# a new module was created in the lucrum library to access API's such as Binance
# loop in each pair
for pair in currency_pairs:
# get attributes from pair tuple
tmp_pair = pair[0] # get pair
tmp_start = pair[1] # get start date
tmp_end = pair[2] # get end date
# get path from the attributes in the tuple
tmp_path = "{0}{1}{2}".format(data_path, tmp_pair, data_format)
my_file = Path(tmp_path)
# check if path exists
if my_file.is_file(): # file exists
print("{0} file existed, retrieving data from file.".format(tmp_pair))
# load data from path
price_history = pd.read_csv(tmp_path)
# convert datetime types
price_history["open_time"] = pd.to_datetime(price_history["open_time"].str.split("+").str[0])
price_history["close_time"] = pd.to_datetime(price_history["close_time"].str.split("+").str[0])
# add the dataframe to the dictionary
pairs_dict[tmp_pair] = price_history
print(price_history.dtypes)
else: # file does not exist, download data from Binance API
print("{0} file does not exist, downloading from {1}.".format(tmp_pair, data_source))
# download from source provided, with the details provided
price_history = ldr.get_data(source=data_source,
symbols=tmp_pair,
start=tmp_start,
end=tmp_end,
interval=data_interval,
timezone=data_timezone)
# save the dataframe as csv file to the path
price_history.to_csv(tmp_path,
index=None,
header=True)
# convert datetime types
price_history["open_time"] = pd.to_datetime(price_history["open_time"].str.split("+").str[0])
price_history["close_time"] = pd.to_datetime(price_history["close_time"].str.split("+").str[0])
# add the dataframe to the dictionary
pairs_dict[tmp_pair] = price_history
print(price_history.dtypes)
##################################################################
# Explore the underlying distributions for the log returns,
# and show other descriptive statistics.
##################################################################
# decimal places
ndigits = 6
# to plot prob distribution
i = 1
fig = plt.figure(figsize=(17,10))
fig.subplots_adjust(hspace=0.3, wspace=0.2)
# loop in all the datasets
for key, value in pairs_dict.items():
# explore dataset
print("Exploring the following dataset: {}".format(key))
# compute log returns
tmp_log_returns = np.log(pairs_dict[key].close / pairs_dict[key].close.shift(1))
# drop nan from the shift 1
tmp_log_returns.dropna(inplace=True)
# show 4 moments of distribution
# - mean: 1st moment
# - standard deviation: 2nd moment
# - skew: 3rd moment
# - kurtosis: 4th moment
ds_mean, ds_std, ds_skew, ds_kurtosis = algo.dist_moments(tmp_log_returns)
# print 4 distribution moments
print("\nDistribution Moments "+
"\n\tMean (1st): {:.6f}".format(round(ds_mean, ndigits)) +
"\n\tSTD (2nd): {0}".format(round(ds_std, ndigits)) +
"\n\tSkew (3rd): {0}".format(round(ds_skew, ndigits)) +
"\n\tKurtosis (4th): {0}".format(round(ds_kurtosis, ndigits)))
# print other stats from pandas describe
print("\nDescriptive stats from pandas describe")
display(tmp_log_returns.describe())
# create subplot
ax = fig.add_subplot(2, 3, i)
ax.title.set_text(key)
ax.set_ylabel("Density")
ax.set_xlabel("Log Returns")
sns.distplot(tmp_log_returns.values, ax=ax)
i = i+1
print("---------------------------------------------------------")
# show plots
print("Distribution plots for all the datasets")
plt.show()
The function below is used to split the dataset, given that the dataset is ordered by time.
We will apply the following splits (for all the datasets)
1) split in-sample and out-sample (80:20)
2) re-split the in-sample to train and validation (80:20)
##################################################################
# Dataset splitting function for Train/Validation/Test Splits
##################################################################
# function to split ordered time series dataset
def split_dataset(data, split_perc):
"""Splits ordered time series pandas dataframe.
Parameters
----------
data: pandas dataframe
An ordered dataframe (ordered by time ASC).
split_perc: float
A float value which specified the percentage of the split,
must be between 0 and 1, where 0 and 1 are exclusive.
Returns
-------
pandas dataframe:
A dataframe split with a total of the split_perc (%) specified.
pandas dataframe:
Another dataframe with the remaining date (1 - split_perc (%)).
"""
# print date range for total samples before split
time_from = data.head(1)["open_time"].astype(str).values[0]
time_to = data.tail(1)["close_time"].astype(str).values[0]
print("[From {} to {}]".format(time_from, time_to))
# print total samples before split
total_rows = data.shape[0]
print("Total samples before splitting: {}".format(total_rows))
# caclulcate the total number of rows given it will be
# split by the percentage specified
first_split_n = int(total_rows * split_perc)
# get splits
first_split, second_split = data.iloc[0:first_split_n, :], data.iloc[first_split_n:total_rows, :]
# percentage of split
first_perc = round(split_perc * 100, 2)
# print stats for first split
time_from = first_split.head(1)["open_time"].astype(str).values[0]
time_to = first_split.tail(1)["close_time"].astype(str).values[0]
print("\n[From {} to {} ({}%)]".format(time_from,
time_to,
first_perc))
# print total samples for first split
print("Total samples in first split: {} ({}%)".format(first_split.shape[0], first_perc))
# percentage for second split
second_perc = 100.00 - first_perc
# print stats for second split
time_from = second_split.head(1)["open_time"].astype(str).values[0]
time_to = second_split.tail(1)["close_time"].astype(str).values[0]
print("\n[From {} to {} ({}%)]".format(time_from,
time_to,
second_perc))
# print total samples for second split
print("Total samples in first split: {} ({}%)".format(second_split.shape[0], second_perc))
# returns both splits
return first_split, second_split
##################################################################
# Split the datasets for Train/Validation/Test Splits
##################################################################
# we will apply the following splits (for all the datasets)
# 1) split in-sample and out-sample (80:20)
# 2) re-split the in-sample to train and validation (80:20)
split_percentage = 0.80 # for both splits we used 80:20
insample_data = OrderedDict() # create an ordered dictionary to hold insample
outsample_data = OrderedDict() # create an ordered dictionary to hold insample
# 1) split in-sample and out-sample (80:20)
print("SPLIT INSAMPLE AND OUTSAMPLE DATASETS")
for key, value in pairs_dict.items(): # loop to split all the dataset to in/out samples
print("-----------------------------------------------------")
print("Splitting in/out samples for: {} \n".format(key))
# get splits
insample_split, outsample_split = split_dataset(pairs_dict[key], split_percentage)
insample_data[key] = insample_split # add insample split to insample dictionary
outsample_data[key] = outsample_split # add outsample split to outsample dictionary
print("-----------------------------------------------------\n")
train_data = OrderedDict() # create an ordered dictionary to hold training set
validation_data = OrderedDict() # create an ordered dictionary to hold validation set
# 2) re-split the in-sample to train and validation (80:20)
print("SPLIT TRAINING AND VALIDATION DATASETS")
for key, value in insample_data.items(): # loop to split all the dataset to train/validation
print("-----------------------------------------------------")
print("Splitting train/validation samples for: {} \n".format(key))
# get splits
training_split, validation_split = split_dataset(insample_data[key], split_percentage)
train_data[key] = training_split # add training split to training dictionary
validation_data[key] = validation_split # add validation split to validation dictionary
print("-----------------------------------------------------\n")
##################################################################
# Common Strategies Params / Dictionaries to hold tuned params
# for TA indicators
##################################################################
# set fee for each trade (based on Binance Fees as of June 2019)
# General: 0.1% trading fee.
trading_fee = 0.001 # 0.1% per trade executed
# we only show the first n and last n when plotting the positions
# as the there are many points since we are using the 15m interval
position_plot_n = 500
# to reproduce results where random is used
random_state = 42
rng = np.random.RandomState(random_state)
# technical indicators will be tuned via brute force / exhaustive search
# will hold a reference to the best tuned parameters to be used in a later
# stage, when applying ML/AI models, so to utilise these params
# create an ordered dictionary to hold best for EMA, tuples of (Lead,Lag)
best_ema = OrderedDict()
# create an ordered dictionary to hold best for SMA, tuples of (Lead,Lag)
best_sma = OrderedDict()
# create an ordered dictionary to hold best for RSI, tuples of (Window,Upper,Lower)
best_rsi = OrderedDict()
# create an ordered dictionary to hold best for WilliamR, tuples of (Window,Overbought,Oversold)
best_willr = OrderedDict()
# create an ordered dictionary to hold best for NATR, tuples of (Window,Volatility Threshold)
best_natr = OrderedDict()
# create an ordered dictionary to hold best for XGBClassifier,
# tuples of (Lagged Features, Max Depth, Confidence Level, Trained Classifier)
best_xgb = OrderedDict()
# create an ordered dictionary to hold best for RandomForest,
# tuples of (Lagged Features, Max Depth, Confidence Level, Trained Classifier)
best_rf = OrderedDict()
# create an ordered dictionary to hold best for RandomForest,
# tuples of (Lagged Features, Confidence Level, Trained Classifier)
best_logreg = OrderedDict()
This is the simplest method which we will be tested, which is simply buying an asset and holding it through the whole period. For this strategy we assume that one trade have been executed, which is executed on time $t_0$ on the test dataset (when buying the asset for the first time), so we will only apply the trading fee once.
##################################################################
# Buy n Hold Strategy (BASELINE MODEL)
##################################################################
# create a new instance for buy and hold strategy
buynhold = BuyHoldStrategy()
# loop in all datasets (test sets)
for key, value in outsample_data.items():
# print the pair which is being tested
print("\nApplying Buy n Hold strategy for: {}\n".format(key))
# make copy of dataframe
current_pair = outsample_data[key].copy()
# generates positions (just one position in this buy n hold)
buynhold.gen_positions(data=current_pair)
# evaluate buy and hold
buynhold.evaluate(data=current_pair, trading_fee=trading_fee)
# print performance
buynhold.stats_perf(data=current_pair)
# set open time as index (for plot)
current_pair.set_index("open_time", inplace=True)
# plot positions
buynhold.plot_pos(data=current_pair)
# plot equity curve
buynhold.plot_perf(data=current_pair)
print("-------------------------------END--------------------------------------------------------")
We decided to test out 3 different types of technical indicators. The following are the types of indicators we will be testing:
1) Trend Indicators 2) Momentum Indicators 3) Volatility Indicators
NOTE: Sometimes trend and momentum indicators are use interchangeably, meaning some may refer to trend indicators as momentum indicators and some may refer to momentum indicators as trend indicators.
NOTE: Sometimes volatility and volume indicators are use interchangeably, meaning some may refer to volatility indicators as volume indicators and some may refer to volume indicators as volatility indicators.
For each type we have chosen to investigate the following technical indicators:
1) Trend Indicator
2) Momentum Indicator
3) Volatility Indicator
So in total we will make use of 7 technical indicators. Now we will test these standard technical indicators strategies, each indicator with it’s own strategy.
We will testing the following Baseline models apart from the Buy and Hold:
This strategy which will be applied is one of the most simplest strategies used when using an Exponential Moving Average (EMA) indicator or any other moving average indicator. We take the following positions based on the following rules:
If EMA_shortwindow > EMA_longwindow
then OpenLongPosition
else
OpenShortPosition
This strategy is usually referred to as the moving average crossover strategy.
NOTE: A fee is applied when you get in a long position, exit a long position, get in a short position or exit a short position. Basically a fee is applied per trade.
##################################################################
# Exponential Moving Average Crossover Strategy (BASELINE MODEL)
##################################################################
# create a new instance for moving average strategy
macrossover = MACrossoverAlgo()
# tune ema crossover method ('brute force method')
# for all the pairs found in the dataset (in sample)
# we are tuning on sharperatio
for key, value in insample_data.items():
# print the pair which is being tuned
print("Tuning EMA strategy using brute force for {}".format(key))
# make copy of dataframe
current_pair = insample_data[key].copy()
# setup initial values
best_lead = 0
best_lag = 0
best_sharpe = -float("inf") # set initial profit/loss to inf
# loop to try different windows for both lead and lag windows
for i in range(5, 50, 5): # lead window (steps of 5)
for j in range(20, 80, 5): # lag window (steps of 5)
# lead window must be shorter than the lag window
if j > i:
# note there are also some traders which use EMA for lead and SMA for lag window
# this MACrossoverAlgo can be used to test this strategy too, as it accepts the
# type of moving average to use 'ema' or 'sma', in our case we will use 'ema' for both
macrossover.gen_features(data=current_pair,
lead="ema", # we will use EMA moving average type for lead
lead_t=i, # set window for short/lead
lag="ema", # we will use EMA moving average type for lead
lag_t=j) # set window for long/lag
# generates positions based on crossover
# a value of '1' indicates a long position, while a value of '-1' indicates a short position
macrossover.gen_positions(data=current_pair)
# evaluate positions by calculating profit and loss
macrossover.evaluate(data=current_pair, trading_fee=trading_fee)
# calculate sharpe ratio
# 96 (15 minutes in a day) and 365 days for the crypto market
# we compute the sharpe ratio based on profit and loss
sharpe_ratio = algo.sharpe_ratio(96*365, current_pair.pl)
# check if we found a new best
if sharpe_ratio > best_sharpe:
best_lead = i
best_lag = j
best_sharpe = sharpe_ratio
# once we found the best parameters
# test on the test set
current_pair = outsample_data[key].copy()
# show plots for best
macrossover.gen_features(data=current_pair,
lead="ema", # we will use EMA moving average type for lead
lead_t=best_lead, # set window for short/lead
lag="ema", # we will use EMA moving average type for lead
lag_t=best_lag) # set window for long/
# print statistics for strategy
macrossover.gen_positions(data=current_pair)
macrossover.evaluate(data=current_pair, trading_fee=trading_fee)
# print best windows
print("[BEST] Lead({}) / Lag({})\n".format(best_lead, best_lag))
# print performance
macrossover.stats_perf(data=current_pair)
# set open time as index for plots
current_pair.set_index("open_time", inplace=True)
# for the position plots we only show the first and last 500
print("\nFirst 500 poistions taken for: {}".format(key))
macrossover.plot_pos(data=current_pair.head(position_plot_n))
macrossover.plot_perf(data=current_pair.head(position_plot_n))
print("Last 500 poistions taken for: {}".format(key))
macrossover.plot_pos(data=current_pair.tail(position_plot_n))
macrossover.plot_perf(data=current_pair.tail(position_plot_n))
# plot performance for the whole range
print("Equity Curve for the whole date range for: {}".format(key))
macrossover.plot_perf(data=current_pair)
# add the best to our best collection
best_ema[key] = (best_lead, best_lag)
print("-------------------------------END--------------------------------------------------------")
This strategy which will be applied is one of the most simplest strategies used when using an Simple Moving Average (SMA) indicator or any other moving average indicator. Unlike the previous moving average this indicator gives equal weights. While EMA is quicker to react to price changes and the SMA react slower, EMA will react quickly, and can cause a trader to get out of a trade on a market hiccup, while the slower-moving SMA keeps the person in the trade, resulting in a bigger profit after the hiccup is finished. This can happen vice-versa, that's why we will test both indicators.
We take the following positions based on the following rules:
If SMA_shortwindow > SMA_longwindow
then OpenLongPosition
else
OpenShortPosition
NOTE: A fee is applied when you get in a long position, exit a long position, get in a short position or exit a short position. Basically a fee is applied per trade.
##################################################################
# Simple Moving Average Crossover Strategy (BASELINE MODEL)
##################################################################
# tune sma crossover method ('brute force method')
# for all the pairs found in the dataset (in sample)
# we are tuning on sharperatio
for key, value in insample_data.items():
# print the pair which is being tuned
print("Tuning SMA strategy using brute force for {}".format(key))
# make copy of dataframe
current_pair = insample_data[key].copy()
# setup initial values
best_lead = 0
best_lag = 0
best_sharpe = -float("inf") # set initial profit/loss to inf
# loop to try different windows for both lead and lag windows
for i in range(5, 50, 5): # lead window (steps of 5)
for j in range(20, 80, 5): # lag window (steps of 5)
# lead window must be shorter than the lag window
if j > i:
# note there are also some traders which use EMA for lead and SMA for lag window
# this MACrossoverAlgo can be used to test this strategy too, as it accepts the
# type of moving average to use 'ema' or 'sma', in our case we will use 'ema' for both
macrossover.gen_features(data=current_pair,
lead="sma", # we will use SMA moving average type for lead
lead_t=i, # set window for short/lead
lag="sma", # we will use SMA moving average type for lead
lag_t=j) # set window for long/lag
# generates positions based on crossover
# a value of '1' indicates a long position, while a value of '-1' indicates a short position
macrossover.gen_positions(data=current_pair)
# evaluate positions by calculating profit and loss
macrossover.evaluate(data=current_pair, trading_fee=trading_fee)
# calculate sharpe ratio
# 96 (15 minutes in a day) and 365 days for the crypto market
# we compute the sharpe ratio based on profit and loss
sharpe_ratio = algo.sharpe_ratio(96*365, current_pair.pl)
# check if we found a new best
if sharpe_ratio > best_sharpe:
best_lead = i
best_lag = j
best_sharpe = sharpe_ratio
# once we found the best parameters
# test on the test set
current_pair = outsample_data[key].copy()
# show plots for best
macrossover.gen_features(data=current_pair,
lead="sma", # we will use SMA moving average type for lead
lead_t=best_lead, # set window for short/lead
lag="sma", # we will use SMA moving average type for lead
lag_t=best_lag) # set window for long/
# print statistics for strategy
macrossover.gen_positions(data=current_pair)
macrossover.evaluate(data=current_pair, trading_fee=trading_fee)
# print best windows
print("[BEST] Lead({}) / Lag({})\n".format(best_lead, best_lag))
# print performance
macrossover.stats_perf(data=current_pair)
# set open time as index for plots
current_pair.set_index("open_time", inplace=True)
# for the position plots we only show the first and last 500
print("\nFirst 500 poistions taken for: {}".format(key))
macrossover.plot_pos(data=current_pair.head(position_plot_n))
macrossover.plot_perf(data=current_pair.head(position_plot_n))
print("Last 500 poistions taken for: {}".format(key))
macrossover.plot_pos(data=current_pair.tail(position_plot_n))
macrossover.plot_perf(data=current_pair.tail(position_plot_n))
# plot performance for the whole range
print("Equity Curve for the whole date range for: {}".format(key))
macrossover.plot_perf(data=current_pair)
# add the best to our best collection
best_sma[key] = (best_lead, best_lag)
print("-------------------------------END--------------------------------------------------------")
Now we will test the RSI. This indicator is a momentum oscillator that measures the speed and change of price movements. The RSI oscillates between zero and 100. Traditionally the RSI is considered overbought when above 70 and oversold when below 30.
If RSI > upper
then OpenShortPosition
else if RSI < lower
OpenLongPosition
else
Sit/Hold
NOTE: A fee is applied when you get in a long position, exit a long position, get in a short position or exit a short position. Basically a fee is applied per trade.
##################################################################
# Simple RSI Strategy overbought/oversold (BASELINE MODEL)
##################################################################
# create a new instance for RSI strategy
simplersi = SimpleRsiAlgo()
# tune rsi method ('brute force method')
# for all the pairs found in the dataset (in sample)
# we are tuning on sharpe ratio parameter
for key, value in insample_data.items():
# print the pair which is being tuned
print("Tuning RSI strategy using brute force for {}".format(key))
# make copy of dataframe
current_pair = insample_data[key].copy()
# setup initial values
best_window = 0
best_upper = 0
best_lower = 0
best_sharpe = -float("inf") # set initial profit/loss to inf
# loop to try different window
for i in range(2, 20, 4): # window size (steps +4)
for j in range(50, 95, 5): # upper bound rsi (overbought) (steps +5)
for k in range(5, 55, 5): # lower bound rsi (oversold) (steps +5)
# check that upper is greater or equal to lower bound
if j >= k:
#print("{} {} {}".format(i, j, k))
# generate features for RSI using the current window
simplersi.gen_features(data=current_pair, window=i)
# generates positions based on overbought and oversold
# a value of '1' indicates a long position, while a value of '-1' indicates a short position
simplersi.gen_positions(data=current_pair, upper=j, lower=k)
# evaluate positions by calculating profit and loss
simplersi.evaluate(data=current_pair, trading_fee=trading_fee)
# calculate sharpe ratio
# 96 (15 minutes in a day) and 365 days for the crypto market
# we compute the sharpe ratio based on profit and loss
sharpe_ratio = algo.sharpe_ratio(96*365, current_pair.pl)
# check if we found a new best
if sharpe_ratio > best_sharpe:
best_window = i
best_upper = j
best_lower = k
best_sharpe = sharpe_ratio
# once we found the best parameters
# test on the test set
current_pair = outsample_data[key].copy()
# generate features for best RSI using the best window
simplersi.gen_features(data=current_pair, window=best_window)
# generate positions for best and print statistics for strategy
simplersi.gen_positions(data=current_pair, upper=best_upper, lower=best_lower)
simplersi.evaluate(data=current_pair, trading_fee=trading_fee)
# print best params
print("[BEST] Window({}) Upper({}) / Lower({})\n".format(best_window, best_upper, best_lower))
# print performance
simplersi.stats_perf(data=current_pair)
# set open time as index for plots
current_pair.set_index("open_time", inplace=True)
# for the position plots we only show the first and last 500
print("\nFirst 500 poistions taken for: {}".format(key))
simplersi.plot_pos(data=current_pair.head(position_plot_n))
simplersi.plot_perf(data=current_pair.head(position_plot_n))
print("Last 500 poistions taken for: {}".format(key))
simplersi.plot_pos(data=current_pair.tail(position_plot_n))
simplersi.plot_perf(data=current_pair.tail(position_plot_n))
# plot performance for the whole range
print("Equity Curve for the whole date range for: {}".format(key))
simplersi.plot_perf(data=current_pair)
# add the best to our best collection
best_rsi[key] = (best_window, best_upper, best_lower)
print("-------------------------------END--------------------------------------------------------")
Now we will test the William%R. This indicator is a momentum oscillator that measures the speed and change of price movements very similar to RSI. Unlike RSI this indicator gives a value between 0 and -100. Traditionally the William %R is considered overbought when above -20 and oversold when below -80.
If William%R > overbought
then OpenShortPosition
else if William%R < oversold
OpenLongPosition
else
Sit/Hold
NOTE: A fee is applied when you get in a long position, exit a long position, get in a short position or exit a short position. Basically a fee is applied per trade.
##################################################################
# Simple William %R Strategy overbought/oversold (BASELINE MODEL)
##################################################################
# create a new instance for William%R strategy
simplewillr = SimpleWilliamRAlgo()
# tune william %R method ('brute force method')
# for all the pairs found in the dataset (in sample)
# we are tuning on sharpe ratio parameter
for key, value in insample_data.items():
# print the pair which is being tuned
print("Tuning William%R strategy using brute force for {}".format(key))
# make copy of dataframe
current_pair = insample_data[key].copy()
# setup initial values
best_window = 0
best_overbought = 0
best_oversold = 0
best_sharpe = -float("inf") # set initial profit/loss to inf
# loop to try different window
for i in range(2, 20, 4): # window size (steps +4)
for j in range(5, 50, 5): # upper bound william %R (overbought) (steps +5)
for k in range(50, 95, 5): # lower bound willaim %R (oversold) (steps +5)
if -j >= -k:
#generate features for William%R using the current window
simplewillr.gen_features(data=current_pair, window=i)
# generates positions based on overbought and oversold
# a value of '1' indicates a long position, while a value of '-1' indicates a short position
simplewillr.gen_positions(data=current_pair, overbought= -j, oversold= -k)
# evaluate positions by calculating profit and loss
simplewillr.evaluate(data=current_pair, trading_fee=trading_fee)
# calculate sharpe ratio
# 96 (15 minutes in a day) and 365 days for the crypto market
# we compute the sharpe ratio based on profit and loss
sharpe_ratio = algo.sharpe_ratio(96*365, current_pair.pl)
# check if we found a new best
if sharpe_ratio > best_sharpe:
best_window = i
best_overbought = -j
best_oversold = -k
best_sharpe = sharpe_ratio
# once we found the best parameters
# test on the test set
current_pair = outsample_data[key].copy()
# generate features for best RSI using the best window
simplewillr.gen_features(data=current_pair, window=best_window)
# generate positions for best and print statistics for strategy
simplewillr.gen_positions(data=current_pair, overbought=best_overbought, oversold=best_oversold)
simplewillr.evaluate(data=current_pair, trading_fee=trading_fee)
# print best params
print("[BEST] Window({}) OverBought({}) / OverSold({})\n".format(best_window, best_overbought, best_oversold))
# print performance
simplewillr.stats_perf(data=current_pair)
# set open time as index for plots
current_pair.set_index("open_time", inplace=True)
# for the position plots we only show the first and last 500
print("\nFirst 500 poistions taken for: {}".format(key))
simplewillr.plot_pos(data=current_pair.head(position_plot_n))
simplewillr.plot_perf(data=current_pair.head(position_plot_n))
print("Last 500 poistions taken for: {}".format(key))
simplewillr.plot_pos(data=current_pair.tail(position_plot_n))
simplewillr.plot_perf(data=current_pair.tail(position_plot_n))
# plot performance for the whole range
print("Equity Curve for the whole date range for: {}".format(key))
simplewillr.plot_perf(data=current_pair)
# add the best to our best collection
best_willr[key] = (best_window, best_overbought, best_oversold)
print("-------------------------------END--------------------------------------------------------")
Now we will combine all the indicators and add NATR which is an indicator which measure volatility.
All the best tuned parameters will be used found in the previous experiments. We will apply the following rules, which takes a position on the signals generated from multiple indicators (using the same strategies used in the previous experiments). As an addition we will be also adding a volatility measure using NATR. Since NATR does not give as a direction/trend we could not test it on its own. The NATR/Volatility threshold will be tuned using the same technique used in the previous experiments using a brute force method.
If (((EMA and SMA) == GoLong) or ((RSI and William%R == GoLong))) and (NATR > volatility_threshold)
then OpenShortPosition
else if (((EMA and SMA) == GoShort) or ((RSI and William%R == GoShort))) and (NATR > volatility_threshold)
OpenLongPosition
else
Sit/Hold
NOTE: A fee is applied when you get in a long position, exit a long position, get in a short position or exit a short position. Basically a fee is applied per trade.
#############################################################################
# Combined Technical Indicators + NATR/Volatility Indicator (BASELINE MODEL)
#############################################################################
# create a new instance for combined TA Algo
combinedTaAlgo = CombinedTaAlgo()
# tune combined TA (NATR Parameters) ('brute force method')
# for all the pairs found in the dataset (in sample)
# we are tuning on sharpe ratio parameter
for key, value in insample_data.items():
# print the pair which is being tuned
print("Tuning NATR in combined TA strategy using brute force for {}".format(key))
# get best params for the indicators found in previous experiments
ema_params = best_ema[key] # EMA parameters (Lead, Lag)
ema_lead = ema_params[0] # get EMA Lead Window
ema_lag = ema_params[1] # get EMA Lag Window
sma_params = best_sma[key] # SMA parameters (Lead, Lag)
sma_lead = sma_params[0] # get SMA Lead Window
sma_lag = sma_params[1] # get SMA Lag Window
rsi_params = best_rsi[key] # RSI parameters (Window, Upper, Lower)
rsi_window = rsi_params[0] # get RSI window
rsi_upper = rsi_params[1] # get Upper (Overbought)
rsi_lower = rsi_params[2] # get Lower (Oversold)
willr_params = best_willr[key] # William%R parameters (Window, overbought, oversold)
willr_window = willr_params[0] # get WillR window
willr_overbought = willr_params[1] # get WillR overbought
willr_oversold = willr_params[2] # get WillR oversold
# print parameters used for each indicator, best from all the previous tuning
print("\nParams for Indicators [{}]\n---------------------------------------------".format(key))
print("EMA Paramaters: Lead({}) Lag({})".format(ema_lead, ema_lag))
print("SMA Paramaters: Lead({}) Lag({})".format(sma_lead, sma_lag))
print("RSI Paramaters: Window({}) Overbought({}) Oversold({})".format(rsi_window, rsi_upper, rsi_lower))
print("WillR Paramaters: Window({}) Overbought({}) Oversold({})".format(willr_window,
willr_overbought,
willr_oversold))
print("---------------------------------------------\n")
# make copy of dataframe
current_pair = insample_data[key].copy()
# setup initial values
best_window = 0
best_vol = 0
best_sharpe = -float("inf") # set initial profit/loss to inf
# loop to try different window
for i in range(2, 20, 4): # window size (steps +4)
for j in np.arange(0.1, .9, 0.1): # volatility threshold (steps +0.1)
#print("{} {}".format(i, j))
#generate features for combined TA using the current window
combinedTaAlgo.gen_features(data=current_pair,
ema_lead=ema_lead,
ema_lag=ema_lag,
sma_lead=sma_lead,
sma_lag=sma_lag,
rsi_window=rsi_window,
willr_window=willr_window,
natr_window=i)
# generates positions based on combined signals
# a value of '1' indicates a long position, while a value of '-1' indicates a short position
combinedTaAlgo.gen_positions(data=current_pair,
rsi_overbought=rsi_upper,
rsi_oversold=rsi_lower,
willr_overbought=willr_overbought,
willr_oversold=willr_oversold,
volatility=j)
# evaluate positions by calculating profit and loss
combinedTaAlgo.evaluate(data=current_pair, trading_fee=trading_fee)
# calculate sharpe ratio
# 96 (15 minutes in a day) and 365 days for the crypto market
# we compute the sharpe ratio based on profit and loss
sharpe_ratio = algo.sharpe_ratio(96*365, current_pair.pl)
# check if we found a new best
if sharpe_ratio > best_sharpe:
best_window = i
best_vol = j
best_sharpe = sharpe_ratio
# once we found the best parameters
# test on the test set
current_pair = outsample_data[key].copy()
# generate features for combined TA with best NATR window
combinedTaAlgo.gen_features(data=current_pair,
ema_lead=ema_lead,
ema_lag=ema_lag,
sma_lead=sma_lead,
sma_lag=sma_lag,
rsi_window=rsi_window,
willr_window=willr_window,
natr_window=best_window)
# generate positions for best and print statistics for strategy
combinedTaAlgo.gen_positions(data=current_pair,
rsi_overbought=rsi_upper,
rsi_oversold=rsi_lower,
willr_overbought=willr_overbought,
willr_oversold=willr_oversold,
volatility=best_vol)
combinedTaAlgo.evaluate(data=current_pair, trading_fee=trading_fee)
# print best params
print("[BEST] Window({}) NATR Threshold({})\n".format(best_window, best_vol))
# print performance
combinedTaAlgo.stats_perf(data=current_pair)
# set open time as index for plots
current_pair.set_index("open_time", inplace=True)
# for the position plots we only show the first and last 500
print("\nFirst 500 poistions taken for: {}".format(key))
combinedTaAlgo.plot_pos(data=current_pair.head(position_plot_n))
combinedTaAlgo.plot_perf(data=current_pair.head(position_plot_n))
print("Last 500 poistions taken for: {}".format(key))
combinedTaAlgo.plot_pos(data=current_pair.tail(position_plot_n))
combinedTaAlgo.plot_perf(data=current_pair.tail(position_plot_n))
# plot performance for the whole range
print("Equity Curve for the whole date range for: {}".format(key))
combinedTaAlgo.plot_perf(data=current_pair)
# add the best to our best collection
best_natr[key] = (best_window, best_vol)
print("-------------------------------END--------------------------------------------------------")
###############################################################################
# XGBoost Classifier Strategy (Proposed Model #1)
#
# We will tune the following hyperparameters using brute force:
# - Lagged Features
# - Max Depth
# - Confidence Level
#
###############################################################################
# tune hyperparameters ('brute force method')
# for all the pairs found in the dataset (training dataset)
# we are tuning on sharpe ratio parameter on the validation
for key, value in train_data.items():
# print the pair which is being tuned
print("Tuning hyperparams for XGBoost strategy using brute force for {}".format(key))
# get best params for the indicators found in previous experiments
# for this model we do not need all the best params
# e.g: we will not need overbought/oversold for WillR
ema_params = best_ema[key] # EMA parameters (Lead, Lag)
ema_lead = ema_params[0] # get EMA Lead Window
ema_lag = ema_params[1] # get EMA Lag Window
sma_params = best_sma[key] # SMA parameters (Lead, Lag)
sma_lead = sma_params[0] # get SMA Lead Window
sma_lag = sma_params[1] # get SMA Lag Window
rsi_params = best_rsi[key] # RSI parameters (Window, Upper, Lower)
rsi_window = rsi_params[0] # get RSI window
willr_params = best_willr[key] # William%R parameters (Window, overbought, oversold)
willr_window = willr_params[0] # get WillR window
natr_params = best_natr[key] # NATR parameters (Window, Volatility Threshold)
natr_window = natr_params[0]
# setup initial values
best_lagged = 0
best_max_depth = 0
best_confidence_level = 0
best_trained_xgb = None
best_sharpe = -float("inf") # set initial profit/loss to inf
# create a new instance for XGBoost Algo
xgbBoostAlgo = XgbBoostAlgo()
# lagged features - limited to 2-4 as we dont want to end up with high dimensions (+2 steps)
for i in range(2, 6, 2):
# max_depth - limited between 4 and 8 to avoid overfitting (+2 steps)
for j in range(4, 10, 2):
# confidence level - limited between .55 and .75,
# as we dont want it to be too high and lose opportunities ( + .05 steps)
for k in np.arange(0.55, .85, 0.1):
# print current test
print("Testing Lagged Features({}) Max Depth({}) Confidence Level({})". format(i, j, k))
# make copy of dataframe
train_set = insample_data[key].copy()
# generate features for XGBboost
feature_names = xgbBoostAlgo.gen_features(data=train_set,
ema_lead=ema_lead,
ema_lag=ema_lag,
sma_lead=sma_lead,
sma_lag=sma_lag,
rsi_window=rsi_window,
willr_window=willr_window,
natr_window=natr_window,
lagged_features=i,
create_y=True)
# train algorithm
xgb_clf = xgbBoostAlgo.train_algo(data=train_set,
features=feature_names,
max_depth=j,
random_state=random_state)
# evaluate positions by calculating profit and loss
# evaluation is done on the validation set
validation_set = validation_data[key].copy()
# must make sure the validation set has all the features
xgbBoostAlgo.gen_features(data=validation_set,
ema_lead=ema_lead,
ema_lag=ema_lag,
sma_lead=sma_lead,
sma_lag=sma_lag,
rsi_window=rsi_window,
willr_window=willr_window,
natr_window=natr_window,
lagged_features=i,
create_y=False)
# generates positions
xgbBoostAlgo.gen_positions(data=validation_set,
clf=xgb_clf,
features=feature_names,
confidence=k)
# evaluate model
xgbBoostAlgo.evaluate(data=validation_set, trading_fee=trading_fee)
# calculate sharpe ratio
# 96 (15 minutes in a day) and 365 days for the crypto market
# we compute the sharpe ratio based on profit and loss
sharpe_ratio = algo.sharpe_ratio(96*365, validation_set.pl)
# check if we found a new best
if sharpe_ratio > best_sharpe:
best_lagged = i
best_max_depth = j
best_confidence_level = k
best_trained_xgb = xgb_clf
best_sharpe = sharpe_ratio
# take reference of best
best_xgb[key] = (best_lagged, best_max_depth, best_confidence_level, best_trained_xgb)
# once we found the best parameters
# test on the test set
test_set = outsample_data[key].copy()
# must make sure the test set has all the features
feature_names = xgbBoostAlgo.gen_features(data=test_set,
ema_lead=ema_lead,
ema_lag=ema_lag,
sma_lead=sma_lead,
sma_lag=sma_lag,
rsi_window=rsi_window,
willr_window=willr_window,
natr_window=natr_window,
lagged_features=best_lagged,
create_y=True)
# show feature importance
xgbBoostAlgo.show_feature_importance(feature_names, best_trained_xgb)
# generates positions
xgbBoostAlgo.gen_positions(data=test_set,
clf=best_trained_xgb,
features=feature_names,
confidence=best_confidence_level)
# evaluate model
xgbBoostAlgo.evaluate(data=test_set, trading_fee=trading_fee)
# print best params
print("[BEST] Lagged Features ({}) Max Depth ({}) Confidence Level ({})\n".format(best_lagged,
best_max_depth,
best_confidence_level))
# print performance
xgbBoostAlgo.stats_perf(data=test_set)
xgbBoostAlgo.evaluate_classifier(best_trained_xgb, test_set[feature_names], test_set["y"])
# set open time as index for plots
test_set.set_index("open_time", inplace=True)
# for the position plots we only show the first and last 500
print("\nFirst 500 poistions taken for: {}".format(key))
xgbBoostAlgo.plot_pos(data=test_set.head(position_plot_n))
xgbBoostAlgo.plot_perf(data=test_set.head(position_plot_n))
print("Last 500 poistions taken for: {}".format(key))
xgbBoostAlgo.plot_pos(data=test_set.tail(position_plot_n))
xgbBoostAlgo.plot_perf(data=test_set.tail(position_plot_n))
# plot performance for the whole range
print("Equity Curve for the whole date range for: {}".format(key))
xgbBoostAlgo.plot_perf(data=test_set)
print("-------------------------------END--------------------------------------------------------")
###############################################################################
# Random Forest Classifier Strategy (Proposed Model #2)
#
# We will tune the following hyperparameters using brute force:
# - Lagged Features
# - Max Depth
# - Confidence Level
#
###############################################################################
# tune hyperparameters ('brute force method')
# for all the pairs found in the dataset (training dataset)
# we are tuning on sharpe ratio parameter on the validation
for key, value in train_data.items():
# print the pair which is being tuned
print("Tuning hyperparams for RandomForest strategy using brute force for {}".format(key))
# get best params for the indicators found in previous experiments
# for this model we do not need all the best params
# e.g: we will not need overbought/oversold for WillR
ema_params = best_ema[key] # EMA parameters (Lead, Lag)
ema_lead = ema_params[0] # get EMA Lead Window
ema_lag = ema_params[1] # get EMA Lag Window
sma_params = best_sma[key] # SMA parameters (Lead, Lag)
sma_lead = sma_params[0] # get SMA Lead Window
sma_lag = sma_params[1] # get SMA Lag Window
rsi_params = best_rsi[key] # RSI parameters (Window, Upper, Lower)
rsi_window = rsi_params[0] # get RSI window
willr_params = best_willr[key] # William%R parameters (Window, overbought, oversold)
willr_window = willr_params[0] # get WillR window
natr_params = best_natr[key] # NATR parameters (Window, Volatility Threshold)
natr_window = natr_params[0]
# setup initial values
best_lagged = 0
best_max_depth = 0
best_confidence_level = 0
best_trained_rf = None
best_sharpe = -float("inf") # set initial profit/loss to inf
# create a new instance for RandomForest Algo
rfAlgo = RandomForestAlgo()
# lagged features - limited to 2-4 as we dont want to end up with high dimensions (+2 steps)
for i in range(2, 6, 2):
# max_depth - limited between 4 and 8 to avoid overfitting (+2 steps)
for j in range(4, 10, 2):
# confidence level - limited between .55 and .75,
# as we dont want it to be too high and lose opportunities ( + .05 steps)
for k in np.arange(0.55, .85, 0.1):
# print current test
print("Testing Lagged Features({}) Max Depth({}) Confidence Level({})". format(i, j, k))
# make copy of dataframe
train_set = insample_data[key].copy()
# generate features for RandomForest
feature_names = rfAlgo.gen_features(data=train_set,
ema_lead=ema_lead,
ema_lag=ema_lag,
sma_lead=sma_lead,
sma_lag=sma_lag,
rsi_window=rsi_window,
willr_window=willr_window,
natr_window=natr_window,
lagged_features=i,
create_y=True)
# train algorithm
rf_clf = rfAlgo.train_algo(data=train_set,
features=feature_names,
max_depth=j,
random_state=random_state)
# evaluate positions by calculating profit and loss
# evaluation is done on the validation set
validation_set = validation_data[key].copy()
# must make sure the validation set has all the features
rfAlgo.gen_features(data=validation_set,
ema_lead=ema_lead,
ema_lag=ema_lag,
sma_lead=sma_lead,
sma_lag=sma_lag,
rsi_window=rsi_window,
willr_window=willr_window,
natr_window=natr_window,
lagged_features=i,
create_y=False)
# generates positions
rfAlgo.gen_positions(data=validation_set,
clf=rf_clf,
features=feature_names,
confidence=k)
# evaluate model
rfAlgo.evaluate(data=validation_set, trading_fee=trading_fee)
# calculate sharpe ratio
# 96 (15 minutes in a day) and 365 days for the crypto market
# we compute the sharpe ratio based on profit and loss
sharpe_ratio = algo.sharpe_ratio(96*365, validation_set.pl)
# check if we found a new best
if sharpe_ratio > best_sharpe:
best_lagged = i
best_max_depth = j
best_confidence_level = k
best_trained_rf = rf_clf
best_sharpe = sharpe_ratio
# take reference of best
best_rf[key] = (best_lagged, best_max_depth, best_confidence_level, best_trained_rf)
# once we found the best parameters
# test on the test set
test_set = outsample_data[key].copy()
# must make sure the test set has all the features
feature_names = rfAlgo.gen_features(data=test_set,
ema_lead=ema_lead,
ema_lag=ema_lag,
sma_lead=sma_lead,
sma_lag=sma_lag,
rsi_window=rsi_window,
willr_window=willr_window,
natr_window=natr_window,
lagged_features=best_lagged,
create_y=True)
# show feature importance
rfAlgo.show_feature_importance(feature_names, best_trained_rf)
# generates positions
rfAlgo.gen_positions(data=test_set,
clf=best_trained_rf,
features=feature_names,
confidence=best_confidence_level)
# evaluate model
rfAlgo.evaluate(data=test_set, trading_fee=trading_fee)
# print best params
print("[BEST] Lagged Features ({}) Max Depth ({}) Confidence Level ({})\n".format(best_lagged,
best_max_depth,
best_confidence_level))
# print performance
rfAlgo.stats_perf(data=test_set)
rfAlgo.evaluate_classifier(best_trained_rf, test_set[feature_names], test_set["y"])
# set open time as index for plots
test_set.set_index("open_time", inplace=True)
# for the position plots we only show the first and last 500
print("\nFirst 500 poistions taken for: {}".format(key))
rfAlgo.plot_pos(data=test_set.head(position_plot_n))
rfAlgo.plot_perf(data=test_set.head(position_plot_n))
print("Last 500 poistions taken for: {}".format(key))
rfAlgo.plot_pos(data=test_set.tail(position_plot_n))
rfAlgo.plot_perf(data=test_set.tail(position_plot_n))
# plot performance for the whole range
print("Equity Curve for the whole date range for: {}".format(key))
rfAlgo.plot_perf(data=test_set)
print("-------------------------------END--------------------------------------------------------")
###############################################################################
# Logistic Regression Classifier Strategy (Proposed Model #3)
#
# We will tune the following hyperparameters using brute force:
# - Lagged Features
# - Confidence Level
#
###############################################################################
# tune hyperparameters ('brute force method')
# for all the pairs found in the dataset (training dataset)
# we are tuning on sharpe ratio parameter on the validation
for key, value in train_data.items():
# print the pair which is being tuned
print("Tuning hyperparams for Logistic Regression strategy using brute force for {}".format(key))
# get best params for the indicators found in previous experiments
# for this model we do not need all the best params
# e.g: we will not need overbought/oversold for WillR
ema_params = best_ema[key] # EMA parameters (Lead, Lag)
ema_lead = ema_params[0] # get EMA Lead Window
ema_lag = ema_params[1] # get EMA Lag Window
sma_params = best_sma[key] # SMA parameters (Lead, Lag)
sma_lead = sma_params[0] # get SMA Lead Window
sma_lag = sma_params[1] # get SMA Lag Window
rsi_params = best_rsi[key] # RSI parameters (Window, Upper, Lower)
rsi_window = rsi_params[0] # get RSI window
willr_params = best_willr[key] # William%R parameters (Window, overbought, oversold)
willr_window = willr_params[0] # get WillR window
natr_params = best_natr[key] # NATR parameters (Window, Volatility Threshold)
natr_window = natr_params[0]
# setup initial values
best_lagged = 0
best_confidence_level = 0
best_trained_logreg = None
best_sharpe = -float("inf") # set initial profit/loss to inf
# create a new instance for LogisticRegression Algo
logregAlgo = LogRegAlgo()
# lagged features - limited to 2-6 as we dont want to end up with high dimensions (+2 steps)
for i in range(2, 6, 2):
# confidence level - limited between .55 and .65,
# as we dont want it to be too high and lose opportunities ( + .05 steps)
for k in np.arange(0.55, .75, 0.1):
# print current test
print("Testing Lagged Features({}) Confidence Level({})". format(i, k))
# make copy of dataframe
train_set = insample_data[key].copy()
# generate features for RandomForest
feature_names = logregAlgo.gen_features(data=train_set,
ema_lead=ema_lead,
ema_lag=ema_lag,
sma_lead=sma_lead,
sma_lag=sma_lag,
rsi_window=rsi_window,
willr_window=willr_window,
natr_window=natr_window,
lagged_features=i,
create_y=True)
# train algorithm
logreg_clf = logregAlgo.train_algo(data=train_set,
features=feature_names,
random_state=random_state)
# evaluation is done on the validation set
validation_set = validation_data[key].copy()
# must make sure the validation set has all the features
logregAlgo.gen_features(data=validation_set,
ema_lead=ema_lead,
ema_lag=ema_lag,
sma_lead=sma_lead,
sma_lag=sma_lag,
rsi_window=rsi_window,
willr_window=willr_window,
natr_window=natr_window,
lagged_features=i,
create_y=False)
# generates positions
logregAlgo.gen_positions(data=validation_set,
clf=logreg_clf,
features=feature_names,
confidence=k)
# evaluate model
logregAlgo.evaluate(data=validation_set, trading_fee=trading_fee)
# calculate sharpe ratio
# 96 (15 minutes in a day) and 365 days for the crypto market
# we compute the sharpe ratio based on profit and loss
sharpe_ratio = algo.sharpe_ratio(96*365, validation_set.pl)
# check if we found a new best
if (sharpe_ratio > best_sharpe) & (validation_set.apply_fee.sum() > 1):
best_lagged = i
best_confidence_level = k
best_trained_logreg = logreg_clf
best_sharpe = sharpe_ratio
# take reference of best
best_logreg[key] = (best_lagged, best_confidence_level, best_trained_logreg)
# once we found the best parameters
# test on the test set
test_set = outsample_data[key].copy()
# must make sure the test set has all the features
feature_names = logregAlgo.gen_features(data=test_set,
ema_lead=ema_lead,
ema_lag=ema_lag,
sma_lead=sma_lead,
sma_lag=sma_lag,
rsi_window=rsi_window,
willr_window=willr_window,
natr_window=natr_window,
lagged_features=best_lagged,
create_y=True)
# print summary of model
logregAlgo.show_feature_importance(feature_names, best_trained_logreg)
# generates positions
logregAlgo.gen_positions(data=test_set,
clf=best_trained_logreg,
features=feature_names,
confidence=best_confidence_level)
# evaluate model
logregAlgo.evaluate(data=test_set, trading_fee=trading_fee)
# print best params
print("[BEST] Lagged Features ({}) Confidence Level ({})\n".format(best_lagged,
best_confidence_level))
# print performance
logregAlgo.stats_perf(data=test_set)
logregAlgo.evaluate_classifier(best_trained_logreg, test_set[feature_names], test_set["y"])
# set open time as index for plots
test_set.set_index("open_time", inplace=True)
# for the position plots we only show the first and last 500
print("\nFirst 500 poistions taken for: {}".format(key))
logregAlgo.plot_pos(data=test_set.head(position_plot_n))
logregAlgo.plot_perf(data=test_set.head(position_plot_n))
print("Last 500 poistions taken for: {}".format(key))
logregAlgo.plot_pos(data=test_set.tail(position_plot_n))
logregAlgo.plot_perf(data=test_set.tail(position_plot_n))
# plot performance for the whole range
print("Equity Curve for the whole date range for: {}".format(key))
logregAlgo.plot_perf(data=test_set)
print("-------------------------------END--------------------------------------------------------")
Now we will combine the signals from the AI/ML models to try and get a better result. We already done something similar in a previous experiment, where we combined a set of technical indicators to take a position. In that experiment we (ourselves) defined the rule to take a short or long position based on the signals. This time we will be using a different approach to take a position based on the signals generated by AI/ML strategies. There are many possible ways to configure a specific rule/s to generate a position and the search space is quite large.
So for this task we will use a meta heuristic technique to find an optimal trading rule based on these signals. We will make use of a Genetic Algorithm which is a meta heuristic approach that is based on natural selection, the process that drives biological evolution. This approach is further explained in our paper in the context of finding an optimal rule from these signals. Basically you can think of this as an optimisation problem where we need to find a rule-based combination which gives us the maximum Sharpe Ratio given a termination condition (in our case number of iterations).
###############################################################################
# Setup datasets for this approach.
###############################################################################
# set up validation/test datasets which will be used to find optimal rules prior the out of sample evalutation
# holds the validation set with signals generated from the ML models
ga_validationset = OrderedDict()
# holds the outsample/test set with signals generated from the ML models
ga_testset = OrderedDict()
# create the validation sets with the signals
for key, value in validation_data.items():
# make copy of the data set
tmp_dataset = validation_data[key].copy()
# get best params for the indicators found in previous experiments
# for this model we do not need all the best params
# e.g: we will not need overbought/oversold for WillR
ema_params = best_ema[key] # EMA parameters (Lead, Lag)
ema_lead = ema_params[0] # get EMA Lead Window
ema_lag = ema_params[1] # get EMA Lag Window
sma_params = best_sma[key] # SMA parameters (Lead, Lag)
sma_lead = sma_params[0] # get SMA Lead Window
sma_lag = sma_params[1] # get SMA Lag Window
rsi_params = best_rsi[key] # RSI parameters (Window, Upper, Lower)
rsi_window = rsi_params[0] # get RSI window
willr_params = best_willr[key] # William%R parameters (Window, overbought, oversold)
willr_window = willr_params[0] # get WillR window
natr_params = best_natr[key] # NATR parameters (Window, Volatility Threshold)
natr_window = natr_params[0]
# create three different copies to generate signals for each model
tmp_dataset_xgb = validation_data[key].copy()
tmp_dataset_rf = validation_data[key].copy()
tmp_dataset_logreg = validation_data[key].copy()
# create instances for the ML strategies
ga_xgbBoostAlgo = XgbBoostAlgo()
ga_rfAlgo = RandomForestAlgo()
ga_logregAlgo = LogRegAlgo()
# get best parameters for each model
xgb_params = best_xgb[key] # (best_lagged, best_max_depth, best_confidence_level, best_trained_rf)
rf_params = best_rf[key] # (best_lagged, best_max_depth, best_confidence_level, best_trained_rf)
logreg_params = best_logreg[key] # (best_lagged, best_confidence_level, best_trained_logreg)
# generate positions for each model
# gen features for XGBboost
xgb_feature_names = ga_xgbBoostAlgo.gen_features(data=tmp_dataset_xgb,
ema_lead=ema_lead,
ema_lag=ema_lag,
sma_lead=sma_lead,
sma_lag=sma_lag,
rsi_window=rsi_window,
willr_window=willr_window,
natr_window=natr_window,
lagged_features=xgb_params[0],
create_y=True)
# generate positions for XGBoost
ga_xgbBoostAlgo.gen_positions(data=tmp_dataset_xgb,
clf=xgb_params[3],
features=xgb_feature_names,
confidence=xgb_params[2])
# for this one we call the evaluate so we copy the log returns
ga_xgbBoostAlgo.evaluate(data=tmp_dataset_xgb, trading_fee=trading_fee)
# append XGB boost signals
tmp_dataset["xgboost_signal"] = tmp_dataset_xgb["position"]
tmp_dataset["log_returns"] = tmp_dataset_xgb["log_returns"]
# gen features for random forest
rf_feature_names = ga_rfAlgo.gen_features(data=tmp_dataset_rf,
ema_lead=ema_lead,
ema_lag=ema_lag,
sma_lead=sma_lead,
sma_lag=sma_lag,
rsi_window=rsi_window,
willr_window=willr_window,
natr_window=natr_window,
lagged_features=rf_params[0],
create_y=True)
# generates positions
ga_rfAlgo.gen_positions(data=tmp_dataset_rf,
clf=rf_params[3],
features=rf_feature_names,
confidence=rf_params[2])
# append RF signals
tmp_dataset["rf_signal"] = tmp_dataset_rf["position"]
# gen features for logistic regression
logreg_feature_names = ga_logregAlgo.gen_features(data=tmp_dataset_logreg,
ema_lead=ema_lead,
ema_lag=ema_lag,
sma_lead=sma_lead,
sma_lag=sma_lag,
rsi_window=rsi_window,
willr_window=willr_window,
natr_window=natr_window,
lagged_features=logreg_params[0],
create_y=True)
# generates positions for logistic regression
ga_logregAlgo.gen_positions(data=tmp_dataset_logreg,
clf=logreg_params[2],
features=logreg_feature_names,
confidence=logreg_params[1])
# append logreg signals
tmp_dataset["logreg_signal"] = tmp_dataset_logreg["position"]
# add dataset with signals from each model to
ga_validationset[key] = tmp_dataset
# create the test sets with the signals
for key, value in outsample_data.items():
# make copy of the data set
tmp_dataset = outsample_data[key].copy()
# get best params for the indicators found in previous experiments
# for this model we do not need all the best params
# e.g: we will not need overbought/oversold for WillR
ema_params = best_ema[key] # EMA parameters (Lead, Lag)
ema_lead = ema_params[0] # get EMA Lead Window
ema_lag = ema_params[1] # get EMA Lag Window
sma_params = best_sma[key] # SMA parameters (Lead, Lag)
sma_lead = sma_params[0] # get SMA Lead Window
sma_lag = sma_params[1] # get SMA Lag Window
rsi_params = best_rsi[key] # RSI parameters (Window, Upper, Lower)
rsi_window = rsi_params[0] # get RSI window
willr_params = best_willr[key] # William%R parameters (Window, overbought, oversold)
willr_window = willr_params[0] # get WillR window
natr_params = best_natr[key] # NATR parameters (Window, Volatility Threshold)
natr_window = natr_params[0]
# create three different copies to generate signals for each model
tmp_dataset_xgb = outsample_data[key].copy()
tmp_dataset_rf = outsample_data[key].copy()
tmp_dataset_logreg = outsample_data[key].copy()
# create instances for the ML strategies
ga_xgbBoostAlgo = XgbBoostAlgo()
ga_rfAlgo = RandomForestAlgo()
ga_logregAlgo = LogRegAlgo()
# get best parameters for each model
xgb_params = best_xgb[key] # (best_lagged, best_max_depth, best_confidence_level, best_trained_rf)
rf_params = best_rf[key] # (best_lagged, best_max_depth, best_confidence_level, best_trained_rf)
logreg_params = best_logreg[key] # (best_lagged, best_confidence_level, best_trained_logreg)
# generate positions for each model
# gen features for XGBboost
xgb_feature_names = ga_xgbBoostAlgo.gen_features(data=tmp_dataset_xgb,
ema_lead=ema_lead,
ema_lag=ema_lag,
sma_lead=sma_lead,
sma_lag=sma_lag,
rsi_window=rsi_window,
willr_window=willr_window,
natr_window=natr_window,
lagged_features=xgb_params[0],
create_y=True)
# generate positions for XGBoost
ga_xgbBoostAlgo.gen_positions(data=tmp_dataset_xgb,
clf=xgb_params[3],
features=xgb_feature_names,
confidence=xgb_params[2])
# for this one we call the evaluate so we copy the log returns
ga_xgbBoostAlgo.evaluate(data=tmp_dataset_xgb, trading_fee=trading_fee)
# append XGB boost signals
tmp_dataset["xgboost_signal"] = tmp_dataset_xgb["position"]
tmp_dataset["log_returns"] = tmp_dataset_xgb["log_returns"]
# gen features for random forest
rf_feature_names = ga_rfAlgo.gen_features(data=tmp_dataset_rf,
ema_lead=ema_lead,
ema_lag=ema_lag,
sma_lead=sma_lead,
sma_lag=sma_lag,
rsi_window=rsi_window,
willr_window=willr_window,
natr_window=natr_window,
lagged_features=rf_params[0],
create_y=True)
# generates positions
ga_rfAlgo.gen_positions(data=tmp_dataset_rf,
clf=rf_params[3],
features=rf_feature_names,
confidence=rf_params[2])
# append RF signals
tmp_dataset["rf_signal"] = tmp_dataset_rf["position"]
# gen features for logistic regression
logreg_feature_names = ga_logregAlgo.gen_features(data=tmp_dataset_logreg,
ema_lead=ema_lead,
ema_lag=ema_lag,
sma_lead=sma_lead,
sma_lag=sma_lag,
rsi_window=rsi_window,
willr_window=willr_window,
natr_window=natr_window,
lagged_features=logreg_params[0],
create_y=True)
# generates positions for logistic regression
ga_logregAlgo.gen_positions(data=tmp_dataset_logreg,
clf=logreg_params[2],
features=logreg_feature_names,
confidence=logreg_params[1])
# append logreg signals
tmp_dataset["logreg_signal"] = tmp_dataset_logreg["position"]
# add dataset with signals from each model to
# ga_testset so we would be able to test our GA optimial rules once found
ga_testset[key] = tmp_dataset.dropna()
###############################################################################
# Generate position based on the encoding passed.
###############################################################################
def trade_signal(data, encoding):
#######################################################################
# At this stage the concept of firing a signal when above a probability
# is no longer used, so each classifier signals a short or long for
# each datapoint
#______________________________________________________________
# ENCODING: |xgboost|AND-OR-XOR|randomforest|AND-OR-XOR|logreg|
# |model |operator |model |operator |model |
#______________________________________________________________
#--------
# models
#--------
# 0 => model signal
# 1 => not(model signal) [negate]
#-----------
# operators
#-----------
# 00 => and
# 01 => or
# 10 => xor
# 11 => or
# makes use of all classifiers
# initial signals from data
data["ga_xgboost"] = np.where(data["xgboost_signal"] == 1.0, 1, 0)
data["ga_rf"] = np.where(data["rf_signal"] == 1.0, 1, 0)
data["ga_logreg"] = np.where(data["logreg_signal"] == 1.0, 1, 0)
# check negate encoding
if encoding[0] == 1:
data["ga_xgboost"] = ~ data["ga_xgboost"]
if encoding[3] == 1:
data["ga_rf"] = ~ data["ga_rf"]
if encoding[6] == 1:
data["ga_logreg"] = ~ data["ga_logreg"]
if encoding[1] == 0 & encoding[2] == 0: # we will use the and operator
data["tmp_signal"] = data["ga_xgboost"] & data["ga_rf"]
elif encoding[1] == 0 & encoding[2] == 1: # we will use the or operator
data["tmp_signal"] = data["ga_xgboost"] | data["ga_rf"]
elif encoding[1] == 1 & encoding[2] == 0: # we will use the xor operator
data["tmp_signal"] = data["ga_xgboost"] ^ data["ga_rf"]
else: # we will use the or operator
data["tmp_signal"] = data["ga_xgboost"] | data["ga_rf"]
if encoding[4] == 0 & encoding[5] == 0: # we will use the and operator
data["position"] = data['tmp_signal'] & data['ga_logreg']
elif encoding[4] == 0 & encoding[5] == 1: # we will use the or operator
data["position"] = data['tmp_signal'] | data['ga_logreg']
elif encoding[4] == 1 & encoding[5] == 0: # we will use the xor operator
data["position"] = data['tmp_signal'] ^ data['ga_logreg']
else: # we will use the or operator
data["position"] = data['tmp_signal'] | data['ga_logreg']
# return short and long positions
data["position"] = np.where(data["position"] == 0, -1.0, 1.0)
return data, 1
###############################################################################
# Compute fitness for a particular encoding based on Sharpe Ratio.
###############################################################################
# fitness for an individual only
def fitness(data, encoding, trading_fee):
# generate signals from encoding
data, error = trade_signal(data, encoding)
# there is an error with encoding
if error == -1:
return 0
# first check where to apply fee based on the position generated by the encoding
# basically we check for any changes between positions
# except when we are not in a trade at all (position is equal to 0)
data["apply_fee"] = 0
data["apply_fee"] = ((data.position != data.position.shift(-1)) | (data.position != data.position.shift(1))).astype(int)
data.loc[data.position == 0, "apply_fee"] = 0
# if a you take a position and you close it immediately apply
# the fee *2 which means you entered and exit at the same candle
# eg. 0 1 0 or -1 1 -1 or 1 -1 1
data.loc[(data.position != 0) & (data.position.shift(1) != data.position ) & (data.position.shift(-1) != data.position) , "apply_fee"] = 2
# calculate profit and loss + tx cost
data["pl"] = (data["position"].shift(1) * data["log_returns"]) - ((data.apply_fee.shift(1) * trading_fee) * np.abs(data["log_returns"]))
# compute sharpe ratio
# 96 (15 minutes in a day) and 365 days for the crypto market
# we compute the sharpe ratio based on profit and loss
sharpe_ratio = fs.sharpe_ratio(96*365, data.pl)
return sharpe_ratio
# fitness for the whole population
def population_fitness(data, pop, c):
f = np.zeros(pop.shape[0])
for index in range(pop.shape[0]):
ind = pop[index, :]
f[index] = fitness(data, ind, c)
return f
###############################################################################
# Selection algorithm to select parents.
###############################################################################
# reference:
# https://stackoverflow.com/questions/10324015/fitness-proportionate-selection-roulette-wheel-selection-in-python
def selection(f, size):
total = sum(f)
pick = random.uniform(0, size)
current = 0
for index in range(pop_size):
current += f[index]
if current > pick:
return index
###############################################################################
# Functions to evaluate the best individual.
###############################################################################
# apply fee and compute profit and loss
def evaluate(data, trading_fee):
# basically we check for any changes between positions
# except when we are not in a trade at all (position is equal to 0)
data["apply_fee"] = 0
data["apply_fee"] = ((data.position != data.position.shift(-1)) | (data.position != data.position.shift(1))).astype(int)
data.loc[data.position == 0, "apply_fee"] = 0
# if a you take a position and you close it immediately apply
# the fee *2 which means you entered and exit at the same candle
# eg. 0 1 0 or -1 1 -1 or 1 -1 1
data.loc[(data.position != 0) & (data.position.shift(1) != data.position ) & (data.position.shift(-1) != data.position) , "apply_fee"] = 2
# calculate profit and loss + tx cost
data["pl"] = (data["position"].shift(1) * data["log_returns"]) - ((data.apply_fee.shift(1) * trading_fee) * np.abs(data["log_returns"]))
# cumulative profit and loss
data["cum_pl"] = data["pl"].cumsum()
# plot positions
def plot_pos(data):
# figure size
fig = plt.figure(figsize=(15,9))
# closing price plot
ax = fig.add_subplot(2,1,1)
ax.plot(data["close"], label="Close Price")
ax.set_ylabel("USDT")
ax.legend(loc="best")
ax.grid()
# positions plot
ax = fig.add_subplot(2,1,2)
ax.plot(data["position"], label="Trading position")
ax.set_ylabel("Trading Position")
ax.set_ylim([-1.5, 1.5])
# show plot
plt.show()
# plot equity curve
def plot_perf(data):
# equity curve plot
data["cum_pl"].plot(label="Equity Curve", figsize=(15,8))
plt.xlabel("Date")
plt.ylabel("Cumulative Returns")
plt.legend()
plt.show()
# show performance statistics
def stats_perf(data):
# print date from and to
days_difference = data.tail(1)["close_time"].values[0] - data.head(1)["open_time"].values[0]
days_difference = int(round(days_difference / np.timedelta64(1, 'D')))
time_from = data.head(1)["open_time"].astype(str).values[0]
time_to = data.tail(1)["close_time"].astype(str).values[0]
print("From {} to {} ({} days)\n".format(time_from, time_to, days_difference))
# print total number of trades
# we can sum up every time a fee was applied to get total trades
total_trades = data.apply_fee.sum()
print("Total number of trades: {}".format(total_trades))
# print avg. trades per date
print("Avg. trades per day: {}".format(round(total_trades / days_difference, 2)))
# print profit/loss (log returns)
cum_return = round(data["cum_pl"].iloc[-1] * 100, 2)
print("Profit/Loss [Log Return]: {0}%".format(cum_return))
# # print profit/loss (simple return)
# simple_return = (np.exp(data.iloc[-1].cum_pl) - 1) * 100
# print("Profit/Loss [Simple Return]: {0}%".format(round(simple_return, 2)))
# print maximum gains (log returns)
max_cum_pl = round(data["cum_pl"].max() * 100, 2)
print("Maximum Gain: {0}%".format(max_cum_pl))
#print maximum loss (log returns)
min_cum_pl = round(data["cum_pl"].min() * 100, 2)
print("Maximum Drawdown: {0}%".format(min_cum_pl))
# print sharpe ratio
# 96 (15 minutes in a day) and 365 days for the crypto market
# we compute the sharpe ratio based on profit and loss
sharpe_ratio = fs.sharpe_ratio(96*365, data.pl)
print("Annualised Sharpe Ratio: {0}".format(round(sharpe_ratio, 6)))
# hyperparameters for GA
pop_size = 30
epochs = 200 # total epochs before termination
mutation_rate = 0.08 # the prob that a mutation occurs
crossover_rate = 0.75 # the prob that a crossover occurs
# find optimal rules in all the crypto currency pairs
for key, value in ga_validationset.items():
print("Finding Optimal Trading Rules using GA for {}".format(key))
tmp_validation = ga_validationset[key].copy()
# create an initial population
population = rng.randint(2, size=(pop_size,7))
pop_fitness = population_fitness(tmp_validation, population, trading_fee)
# start GA Algorithm
for i in range(epochs):
# create a new empty population for offsprings
parent_pool_size = population.shape[0]
next_population = np.zeros((pop_size * 2, 7), dtype=bool)
# start creating offsprings
for p in range(pop_size):
# selection phase -> get two parents
p1 = population[selection(pop_fitness, parent_pool_size), :]
p2 = population[selection(pop_fitness, parent_pool_size), :]
# a crossover is applied if the random is less than crossover rate
# in crossover you would take half the encoding from p1 and the other
# half from p2 based on th crossover point (index to split by)
if random.random() < crossover_rate:
crossover_point = rng.randint(7, size=1)[0]
next_population[p, 0:crossover_point] = p1[0:crossover_point]
next_population[p, crossover_point:] = p2[crossover_point:]
p = p + 1
next_population[p, 0:crossover_point] = p2[0:crossover_point]
next_population[p, crossover_point:] = p1[crossover_point:]
else:
next_population[p, :] = p1
p = p + 1
next_population[p, :] = p2
# apply mutation to offsprings if random is below the mutation rate
# we use every basic mutation technique which changes a 0 to a 1 or vice versa
for p in range(pop_size):
for j in range(7):
if random.random() < mutation_rate:
next_population[p, j] = ~ next_population[p, j]
# set new population to current population
population = next_population
# Calculate population fitness
pop_fitness = population_fitness(tmp_validation, population, trading_fee)
# we order the population by fitness
fitSortInds = pop_fitness.argsort()
pop_fitness = pop_fitness[fitSortInds[::-1]]
population = population[fitSortInds[::-1]]
population = population[0:pop_size, :]
pop_fitness = pop_fitness[0:pop_size]
#print('Iteration ' + str(i) + ': Fitness = ' + str(sum(pop_fitness)) + '; Best = ' + str(max(pop_fitness)))
# get best trading rule found (optimised on Sharpe Ratio)
best_encoding = population[0, :]
# make copy of the test set to evaluate on unseen data
tmp_test_set = ga_testset[key].copy()
# print best encoding/rules
print("Best encoding found: {}".format(best_encoding.astype(int)))
# apply the trading rules and create positions
tmp_data = trade_signal(tmp_test_set, best_encoding)[0]
# evaluate on the test data
evaluate(tmp_data, trading_fee)
# show performance statistics
stats_perf(tmp_data)
# set open time as index for plots
tmp_data.set_index("open_time", inplace=True)
# for the position plots we only show the first and last 500
print("\nFirst 500 poistions taken for: {}".format(key))
plot_pos(data=tmp_data.head(position_plot_n))
plot_perf(data=tmp_data.head(position_plot_n))
print("Last 500 poistions taken for: {}".format(key))
plot_pos(data=tmp_data.tail(position_plot_n))
plot_perf(data=tmp_data.tail(position_plot_n))
# plot performance for the whole range
print("Equity Curve for the whole date range for: {}".format(key))
plot_perf(data=tmp_data)
print("-------------------------------END--------------------------------------------------------")